001 /* 002 * LSFDispatcher.java 003 * 004 * Created on December 23, 2002, 11:30 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.lsf; 024 025 import gov.bnl.star.offline.scheduler.ComponentLibrary; 026 import gov.bnl.star.offline.scheduler.Dispatcher; 027 import gov.bnl.star.offline.scheduler.Job; 028 import gov.bnl.star.offline.scheduler.Request; 029 import gov.bnl.star.offline.scheduler.catalog.PhysicalFile; 030 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 031 //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH 032 import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition; 033 034 035 import java.util.List; 036 import java.util.logging.Level; 037 import java.util.logging.Logger; 038 039 040 041 /** Dispatches a job using LSF. 042 * <p> 043 * For each process, two files are created: a script for the execution and 044 * a text file containing the file list. The script basically sets the 045 * environment variables and executes the command line. The file list 046 * contains the input file requested, one full path for each line in the list. 047 * <p> 048 * Each script is submitted through bsub. 049 * <p> 050 * The simulation flag will make the scheduler not actually execute the command 051 * lines. Therefore scripts and fileLists are created, but the bsub and chmod 052 * commands are not executed. Log and output won't be affected, except that there 053 * will be a message warning that the submission is simulated. 054 * 055 * @author Gabriele Carcassi, Jerome Lauret 056 * @version 1.0 2002/12/26 057 */ 058 public class LSFDispatcher implements Dispatcher { 059 static private Logger log = Logger.getLogger(LSFDispatcher.class.getName()); 060 private String resourceStrategy; 061 protected String scratchDir; 062 private String bsubEx; 063 protected boolean simulation = false; 064 private String queueName; 065 private String bsubOptions; 066 private int maxBsubAttempts; 067 private int msBtwnSuccess; 068 private int msBtwnFailure; 069 private String ResReqDefinitionObj; 070 071 public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){ 072 this.ResReqDefinitionObj = ResReqDefinitionObj; 073 074 } 075 076 public String getScratchDir() { 077 return scratchDir; 078 } 079 080 public void setScratchDir(String scratchDir) { 081 this.scratchDir = scratchDir; 082 } 083 084 public String getBsubEx() { 085 return bsubEx; 086 } 087 088 public void setBsubEx(String bsubEx) { 089 this.bsubEx = bsubEx; 090 } 091 092 public String getQueueName() { 093 return queueName; 094 } 095 096 public void setQueueName(String queueName) { 097 this.queueName = queueName; 098 } 099 100 public String getBsubOptions() { 101 return bsubOptions; 102 } 103 104 public void setBsubOptions(String bsubOptions) { 105 this.bsubOptions = bsubOptions; 106 } 107 108 public int getMaxAttempts() { 109 return maxBsubAttempts; 110 } 111 112 public void setMaxAttempts(int maxAttempts) { 113 this.maxBsubAttempts = maxAttempts; 114 } 115 116 public int getMsBtwnSuccess() { 117 return msBtwnSuccess; 118 } 119 120 public void setMsBtwnSuccess(int msBtwnSuccess) { 121 this.msBtwnSuccess = msBtwnSuccess; 122 } 123 124 public int getMsBtwnFailure() { 125 return msBtwnFailure; 126 } 127 128 public void setMsBtwnFailure(int msBtwnFailure) { 129 this.msBtwnFailure = msBtwnFailure; 130 } 131 132 protected boolean reportedFailure; 133 protected CSHApplication application; 134 private String resSwitch; 135 136 /** Creates the scripts and dispatches the job on the target machine. 137 * @param request the job request 138 */ 139 public void dispatch(Request request, List jobs) { 140 log.info("Dispatching using LSF: \"" + request.getCommand() + "\""); 141 142 // Enables the simulation mode if necessary 143 useSimulationMode(request.getSimulation()); 144 reportedFailure = false; 145 146 // Submits from the higher to the lower JobID. This way the 147 // user has a feel of when the last job is going to be 148 // submitted 149 for (int nJob = jobs.size() - 1; nJob >= 0; 150 nJob--) { 151 Job job = (Job) jobs.get(nJob); 152 153 System.out.print("Dispatching process " + 154 job.getJobID() + "."); 155 dispatch(request, job); 156 if (getClusterName() != null) job.setCluster(getClusterName()); 157 } 158 159 //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH 160 } 161 162 /* Enables or disables the simulation mode. The simulation mode will deactivate 163 * every command line execution. 164 */ 165 protected void useSimulationMode(boolean simulation) { 166 this.simulation = simulation; 167 168 if (simulation) { 169 // Warn the user that we are entering simulated submission mode 170 log.warning("Simulating submission"); 171 System.out.println("Simulating submission"); 172 } 173 } 174 175 protected void reportProcessSubmissionFailure(Request request, Job job, int jobNumber, String message) { 176 reportFailure(job); 177 System.out.println("Process number " + jobNumber + " wasn't submitted."); 178 System.out.println(message); 179 System.out.println(); 180 System.out.println("The process input file were:"); 181 182 List list = job.getInput(); 183 184 for (int nFile = 0; nFile < list.size(); nFile++) { 185 PhysicalFile file = (PhysicalFile) list.get(nFile); 186 System.out.println(" - " + file.getPath() + "/" + 187 file.getFilename()); 188 } 189 } 190 191 protected void reportFailure(Job job) { 192 if (!reportedFailure) { 193 System.out.println("There were some errors during job submission."); 194 System.out.println("Some processes weren't submitted:"); 195 } 196 } 197 198 /** Currently not implemented 199 * @param request the job for which to retrieve the output 200 */ 201 public void retrieveOutput(Request request, List jobs) { 202 } 203 204 /* Dispatches a single process of a job request. 205 */ 206 protected void dispatch(Request request, Job job) { 207 application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 208 209 // TODO: all the parameters should be passed in one go 210 application.setJob(request, job); 211 application.setScratchDir(scratchDir); 212 application.setSubmissionCommand(getBsubCommand(request, job)); 213 214 application.prepareJob(); 215 216 log.info("Executing \"" + getBsubCommand(request, job) + "\""); 217 218 if (!simulation) { 219 try { 220 Thread.sleep(msBtwnSuccess); 221 } catch (Exception e) { 222 } 223 224 int attempt = 0; 225 boolean success = false; 226 String pe=""; 227 228 while (!success && (attempt < maxBsubAttempts)) { 229 try { 230 CSHCommandLineTask task = new CSHCommandLineTask(getBsubCommand( 231 request, job), true, 30000); 232 task.execute(); 233 234 if (task.getExitStatus() != 0) { 235 log.warning("bsub failed: " + task.getOutput()); 236 Thread.sleep(msBtwnFailure); 237 System.out.print("/"); 238 attempt++; 239 } else { 240 success = true; 241 } 242 } catch (Exception e) { 243 log.log(Level.SEVERE, "Couldn't submit the script to LSF", e); 244 // JL: write also to STDOUT to help users debug what happened 245 if (pe == ""){ 246 pe = e.toString(); 247 System.out.print("Couldn't submit the script to LSF" + pe); 248 } 249 try { 250 Thread.sleep(msBtwnFailure); 251 } catch (Exception e1) { 252 } 253 254 System.out.print("/"); 255 attempt++; 256 } 257 } 258 259 if (success) { 260 System.out.println(" done."); 261 } else { 262 System.out.println(" FAILED!!"); 263 } 264 } else { 265 System.out.println(" simulated."); 266 } 267 } 268 269 /** Returns the full bsub command to be executed to dispatch the process. This 270 * command must executed in the directory in which the script resides. 271 * @return the bsub command 272 */ 273 String getBsubCommand(Request request, Job job) { 274 StringBuffer bsub = new StringBuffer(bsubEx); 275 bsub.append(" -q ").append(getQueueName(job)); 276 277 if (job.getTarget() != null) { 278 bsub.append(" -m ").append(job.getTarget()); 279 } 280 281 if (application.getJobName() != null) { 282 bsub.append(" -J '").append(application.getJobName()).append("'"); 283 } 284 285 if (application.getStdin() != null) { 286 bsub.append(" -i ").append(application.getStdin()); 287 } 288 289 if (application.getStdout() != null) { 290 bsub.append(" -o ").append(application.getStdout()); 291 } 292 293 if (application.getStderr() != null) { 294 bsub.append(" -e ").append(application.getStderr()); 295 } 296 297 298 299 //This takes the GenericResourceRequirementStringDefinition (if any) and adds the ruage[] to it if the is any then it write out the string 300 GenericResourceRequirementStringDefinition lsfResReqDef = new GenericResourceRequirementStringDefinition(); 301 302 303 if(ResReqDefinitionObj != null) 304 lsfResReqDef = (GenericResourceRequirementStringDefinition) ComponentLibrary.getInstance().getComponent(ResReqDefinitionObj); 305 306 307 if ((getResourceUsageSwitch(job) != null)&&( lsfResReqDef.hasResourcesDefinition(job))) { 308 309 String Res = "\"(" + lsfResReqDef.makeString(job).replaceAll("\"", "").concat(") ").concat(getResourceUsageSwitch(job).replaceAll("\"", "")).concat("\""); 310 bsub.append(" -R ").append(Res); 311 } 312 else if(getResourceUsageSwitch(job) != null){ 313 bsub.append(" -R ").append(getResourceUsageSwitch(job)); 314 } 315 else if( lsfResReqDef.hasResourcesDefinition(job)){ 316 bsub.append(" -R ").append(lsfResReqDef.makeString(job)); 317 } 318 //////////////////////////lbh 319 /* 320 321 if (getResourceUsageSwitch(job) != null) { 322 bsub.append(" -R ").append(getResourceUsageSwitch(job)); 323 } 324 */ 325 bsub.append(' ').append(bsubOptions); 326 327 328 bsub.append(' ').append(application.getCommandLine()); 329 330 return bsub.toString(); 331 } 332 333 334 private LSFResourceStrategy lsfResourceStrategy; 335 336 /** Holds value of property clusterName. */ 337 private String clusterName; 338 339 public void setResourceStrategy(LSFResourceStrategy resourceStrategy) { 340 this.lsfResourceStrategy = resourceStrategy; 341 } 342 343 public LSFResourceStrategy getResourceStrategy() { 344 return lsfResourceStrategy; 345 } 346 347 protected String getResourceUsageSwitch(Job job) { 348 //FIXME: cache value 349 if (getResourceStrategy() == null) { 350 return null; 351 } 352 353 resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job); 354 355 return resSwitch; 356 } 357 358 protected String getQueueName(Job job) { 359 String queue = job.getQueue(); 360 361 if (queue == null) { 362 return queueName; 363 } 364 365 return queue; 366 } 367 368 /** Getter for property clusterName. 369 * @return Value of property clusterName. 370 * 371 */ 372 public String getClusterName() { 373 return this.clusterName; 374 } 375 376 /** Setter for property clusterName. 377 * @param clusterName New value of property clusterName. 378 * 379 */ 380 public void setClusterName(String clusterName) { 381 this.clusterName = clusterName; 382 } 383 384 public void Kill(Request request, List jobs) { 385 } 386 387 public String Status(Job job, int Processe) { 388 return "status unavailable"; 389 } 390 391 }